package saic.kafka.stream

/**
 * @author ZhiLi
 */
import java.util.Properties
import scala.util.Properties
import kafka.javaapi.producer.Producer
import kafka.producer.KeyedMessage
import kafka.producer.KeyedMessage
import kafka.producer.ProducerConfig
import scala.util.Random
import org.json.JSONObject
 
object KafkaEventProducer {
  
  private val users = Array(
      "4A4D769EB9679C054DE81B973ED5D768", "8dfeb5aaafc027d89349ac9a20b3930f",
      "011BBF43B89BFBF266C865DF0397AA71", "f2a8474bf7bd94f0aabbd4cdd2c06dcf",
      "068b746ed4620d25e26055a9f804385f", "97edfc08311c70143401745a03a50706",
      "d7f141563005d1b5d0d3dd30138f3f62", "c8ee90aade1671a21336c721512b817a",
      "6b67c8c700427dee7552f81f3228c927", "a95f22eabc4fd4b580c011a3161a9d9d")
      
  private val random = new Random()
      
  private var pointer = -1
  
  def getUserID() : String = {
       pointer = pointer + 1
    if(pointer >= users.length) {
      pointer = 0
      users(pointer)
    } else {
      users(pointer)
    }
  }
  
  def click() : Double = {
    random.nextInt(10)
  }
  
  // bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic mykafka --replication-factor 1 --partitions 1
  // bin/kafka-topics.sh --zookeeper localhost:2181 --list
  // bin/kafka-topics.sh --zookeeper localhost:2181 --describe mykafka
  // bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic mykafka --from-beginning
  def main(args: Array[String]): Unit = {
    val topic = "mykafka01"
//    val brokers="172.16.85.129:9092"
    val brokers = "10.122.17.129:9095,10.122.17.130:9095,10.122.17.131:9095"
    val props = new Properties()
    props.put("metadata.broker.list", brokers)
    props.put("serializer.class", "kafka.serializer.StringEncoder")
    
    val kafkaConfig = new ProducerConfig(props)
    val producer = new Producer[String, String](kafkaConfig)
    
    while(true) {
      // prepare event data
      val event = new JSONObject()
      event
        .put("uid", getUserID)
        .put("event_time", System.currentTimeMillis.toString)
        .put("os_type", "Android")
        .put("click_count", click)
      
      // produce event message
      producer.send(new KeyedMessage[String, String](topic,"key" ,event.toString))
      println("Message sent: " + event)
      
      Thread.sleep(200)
    }
  }  
}